#include "shmtpool.h"
#include "base-sem.h"

void show_handler(int sig)
{
    printf("I got signal %d\n", sig);
}

int child_process(process_pool *pool, int index)
{
    task_st *ptask_self = NULL;
    int ret = 0;
    if (pool == NULL && pool->id == NULL)
    {
        return -1;
    }
    int pid = fork();
    if (pid == -1)
    {
        fprintf(stderr, "Fork fail\n");
        exit(-1);
    }
    if (pid > 0)
    {
        //parent
        return pid;
    }
    /*
	struct sigaction act, oldact;
    act.sa_handler = show_handler;
    sigaddset(&act.sa_mask, SIGQUIT); //见注(1)
    act.sa_flags = SA_RESETHAND | SA_NODEFER; //见注(2)
    //act.sa_flags = 0; //见注(3)
	sigaction(SIGINT, &act, &oldact);
	*/
    /*
	struct sigaction sa; 
	sa.sa_handler = SIG_IGN; 
	sa.sa_flags = SA_NOCLDWAIT; //进程退出自己回收资源，防止僵死进程
	sigemptyset(&sa.sa_mask); 
	sigaction(SIGCHLD, &sa, NULL); 
	*/

    int ppid = getpid();

    pool->shm_key = SHM_KEY;
    pool->shmid = shmget(pool->shm_key, sizeof(shared_st) + sizeof(task_st) * (pool->size), 0666 | S_IRUSR | S_IWUSR | IPC_CREAT);
    if (pool->shmid == -1)
    {
        fprintf(stderr, "shmget fail\n");
        //exit(-1);
    }

    pool->shared_mem = (shared_st *)shmat(pool->shmid, NULL, 0);
    if (pool->shared_mem == (void *)-1)
    {
        fprintf(stderr, "shmat fail\n");
        exit(-1);
    }

    ptask_self = pool->shared_mem->task_pro + index;
    ptask_self->index_pro = index;
    ptask_self->init_success = 1;
    printf("test2->pid:%d index=%d %p %d %d\n", ppid, index, ptask_self, ptask_self->index_pro, ptask_self->cur_task_status);

    while (pool->shared_mem->stop == false)
    {
        ret = lock_sem(pool->semid, index);
        if (-1 == ret) //error
        {
            int t = get_sem_value(pool->semid, index);
            printf("pid[%d] current semid index[%d] value is %d\n", getpid(), 1, t);
        }
        if (pool->shared_mem->stop == true) //接受退出
        {
            ptask_self->cur_task_status == IDLE_STATUS;
            break;
        }
        //read data
        st_data *task = (st_data *)ptask_self->data;
        if (task->slen >= 0)
        {
            printf("pid=%d len=%d data=%s\n", getpid(), task->slen, task->data);
            //if(task->slen <= 0) printf("pid=%d len=%d data=%s\n",getpid(),task->slen,task->data);
            //memset(task->data,0,sizeof(st_data));
            ptask_self->cur_task_status = IDLE_STATUS;
        }
    }
    exit(0);
    return 0;
}

void *manager_process(void *arg)
{
    int i = 0;
    int kill_rc = 0;
    process_pool *pool = (process_pool *)arg;
    task_st *ptask = NULL;

    if (pool == NULL)
    {
        pthread_exit(NULL);
    }
    int size = pool->size;
    ptask = pool->shared_mem->task_pro;

    while (1)
    {
        for (i = 0; i < size; i++)
        {
            if (pool->id[i].pid == -1)
            {
                set_sem_value(pool->semid, i, 0); //同步信号量  初始化一个进程
                ptask = ptask + i;
                ptask->index_pro = -1;
                ptask->init_success = -1;
                ptask->cur_task_status = UNKNEW_STATUS;
                memset(ptask->data, 0, sizeof(ptask->data));

                pool->id[i].pid = child_process(pool, i); //创建进程
                pool->id[i].index = i;
                while (ptask->init_success != 1)
                {
                    sleep(1);
                }
                ptask->cur_task_status = IDLE_STATUS;
                continue;
            }

            kill_rc = kill(pool->id[i].pid, 0);
            if (kill_rc == -1)
            {
                int error_num = errno;
                switch (error_num)
                {
                case EINVAL:
                    printf("An invalid signal was specified\n");
                    break;
                case EPERM:
                    printf("The  process  does  not have permission to send the signal to any of the target processes.\n");
                    break;
                case ESRCH:
                    printf("the specified thread did not exists or already quit\n");
                    //kill(-(pool->id[i].pid), SIGTERM); //杀死子进程及子孙进程 然后再去建立新进程
                    //pool->id[i].pid = waitpid(null,pool->id[i].pid,NULL,0); //彻底消灭僵死进程

                    set_sem_value(pool->semid, i, 0); //同步信号量  初始化一个进程
                    ptask = ptask + i;
                    ptask->index_pro = -1;
                    ptask->init_success = -1;
                    ptask->cur_task_status = UNKNEW_STATUS;
                    memset(ptask->data, 0, sizeof(ptask->data));

                    pool->id[i].pid = child_process(pool, i); //创建进程
                    pool->id[i].index = i;
                    while (ptask->init_success != 1) // 等待创建完成
                    {
                        sleep(1);
                    }
                    ptask->cur_task_status = IDLE_STATUS; //这样才可以分配出去
                    break;
                }
            }
            else
            {
                ; //printf("the specified thread is alive\n");
            }
            sleep(1);
        }
        sleep(5);
    }
    pinrtf("exit manager_process pthread!\n");
    pthread_exit(NULL);
}

process_pool *create_process_tpool(int size) //size个进程数
{
    int i = 0, is_finish = 0;
    process_pool *pool = NULL;
    task_st *ptask = NULL;

    if (size <= 0)
    {
        fprintf(stderr, "size is 0 fail\n");
        return NULL;
    }
    pool = (process_pool *)malloc(sizeof(process_pool));
    if (pool == NULL)
    {
        goto lebel_end1;
    }
    pool->id = (process_id *)malloc(sizeof(process_id) * size);
    if (pool->id == NULL)
    {
        //remove shm
        fprintf(stderr, "malloc id fail\n");
        goto lebel_end2;
    }
    pool->size = size;
    pool->shm_key = SHM_KEY;
    pool->shmid = shmget(pool->shm_key, sizeof(shared_st) + sizeof(task_st) * (pool->size), 0666 | S_IRUSR | S_IWUSR | IPC_CREAT);
    if (pool->shmid == -1)
    {
        fprintf(stderr, "shmget fail\n");
        goto lebel_end3;
    }

    pool->shared_mem = (shared_st *)shmat(pool->shmid, NULL, 0);
    if (pool->shared_mem == (void *)-1)
    {
        fprintf(stderr, "shmat fail\n");
        goto lebel_end4;
    }

    pool->semid = create_sem(KEY_ID, size);
    pool->shared_mem->stop = false;
    pool->shared_mem->task_pro = (task_st *)(pool->shared_mem + sizeof(unsigned short));

    ptask = pool->shared_mem->task_pro;
    for (i = 0; i < size; i++) //初始化所有的共享内存的地址， 针对每个进程的资源进行初始化
    {
        set_sem_value(pool->semid, i, 0); //同步信号量
        ptask->index_pro = -1;
        ptask->init_success = -1;
        ptask->cur_task_status = IDLE_STATUS;
        memset(ptask->data, 0, sizeof(ptask->data));
        ptask = ptask + 1;
    }

    //start fork
    for (i = 0; i < size; i++) //分配size个进程并映射共享内存地址信息
    {
        pool->id[i].pid = child_process(pool, i); //创建fork子进程
        pool->id[i].index = i;
    }

    //init process finish
    while (1) //同步所有的进程初始化完成  等待所有的进程都获取了自己的index 和共享内存的首地址信息
    {
        ptask = pool->shared_mem->task_pro;
        is_finish = 0;
        for (i = 0; i < size; i++)
        {
            if (ptask->init_success == 1)
            {
                ++is_finish;
            }
            ptask = ptask + 1;
        }
        if (is_finish == size) //标识共享内存所有的进程已经完后了首地址映射和index的获取
            break;
    }

    pool->manager_pthread_id = -1;
    if (pthread_create(&(pool->manager_pthread_id), NULL, manager_process, (void *)pool) != 0)
    {
        printf("pthread_create create manager_process error!\n");
    }
    //sleep(4);
    /*
	ptask = pool->shared_mem->task_pro;
	for(i = 0; i < size; i++)
	{
		
		//ptask->index_pro = -1;
		//ptask->cur_task_status = IDLE_STATUS;
		printf("test->pid:%d index=%d %p %d %d\n",pool->id[i].pid,pool->id[i].index, ptask,ptask->index_pro,ptask->cur_task_status);
		ptask = ptask + 1;
	}*/
    return pool;

lebel_end4:
    shmctl(pool->shmid, IPC_RMID, 0); //删除清除 贡献内存 ，系统回收掉
lebel_end3:
    free(pool->id);
lebel_end2:
    free(pool);
lebel_end1:
    return NULL;
}

int destroy_process_tpool(process_pool *pool)
{
    int status = 0;
    int i, ret;
    task_st *ptask = NULL;
    if (pool == NULL || pool->shared_mem == NULL)
    {
        return -1;
    }
    int size = pool->size;
    for (i = 0; i < size; i++)
    {
        if (pool->id[i].pid == -1)
        {
            continue;
        }
        ret = waitpid(pool->id[i].pid, NULL, 0);
        /*
		do 
		{
		   ret = waitpid(pool->id[i].pid, &status, WUNTRACED | WCONTINUED);
		   if (ret == -1) { perror("waitpid"); exit(EXIT_FAILURE); }

		   if(WIFEXITED(status)) 
		   {
			   printf("exited, status=%d\n", WEXITSTATUS(status));
		   }
		   else if (WIFSIGNALED(status)) 
		   {
			   printf("killed by signal %d\n", WTERMSIG(status));
		   } 
		   else if (WIFSTOPPED(status)) 
		   {
			   printf("stopped by signal %d\n", WSTOPSIG(status));
		   } 
		   else if (WIFCONTINUED(status)) 
		   {
			   printf("continued\n");
		   }
	   }while(!WIFEXITED(status) && !WIFSIGNALED(status));
		*/
    }
    //wait(&status);
    shmctl(pool->shmid, IPC_RMID, 0);
    free(pool->id);
    free_sem(pool->semid);
    free(pool);
    return 0;
}

int stop_process_tpool(process_pool *pool)
{
    int i, ret, num = 0;
    task_st *ptask = NULL;
    if (pool == NULL || pool->shared_mem == NULL)
    {
        return -1;
    }
    int size = pool->size;

    while (1) //等待所有的进程都是空闲状态时，才可以退出
    {
        num = 0;
        ptask = pool->shared_mem->task_pro;
        for (i = 0; i < size; i++)
        {
            if (pool->id[i].pid == -1)
            {
                ptask = ptask + 1;
                ++num;
                continue;
            }
            if (ptask->cur_task_status == IDLE_STATUS)
            {
                ++num;
            }
            //printf("pid %d  ptask->cur_task_status = %d\n",pool->id[i].pid,ptask->cur_task_status);
            ptask = ptask + 1;
        }
        if (num == size)
            break;
        //printf("num %d  size %d\n",num,size);
        //sleep(1);
    }

    if (pool->manager_pthread_id != -1)
    {
        ret = pthread_cancel(pool->manager_pthread_id);
        if (ret < 0)
        {
            int error_num = errno;
            if (ESRCH == error_num)
            {
                printf("the specified thread did not exists or already quit\n");
            }
        }
        else
        {
            //printf("pthread_cancel %p success!\n",pool->manager_pthread_id);
        }
        ret = pthread_join(pool->manager_pthread_id, NULL);
        if (ret < 0)
        {
        }
        else
        {
            //printf("pthread_join %p success!\n",pool->manager_pthread_id);
        }
    }

    pool->shared_mem->stop = true;
    ptask = pool->shared_mem->task_pro;
    for (i = 0; i < size; i++)
    {
        if (pool->id[i].pid == -1)
        {
            ptask = ptask + 1;
            continue;
        }
        if (ptask->cur_task_status == IDLE_STATUS)
        {
            //printf("pid %d  ptask->cur_task_status = %d\n",pool->id[i].pid,ptask->cur_task_status);
            set_sem_value(pool->semid, ptask->index_pro, 1);
        }
        ptask = ptask + 1;
    }
    return 0;
}

int distribute_task_tpool(process_pool *pool, st_data *data)
{
    int i, ret = 0, nret = -1;
    task_st *ptask = NULL;
    static int index = 0;
    if (pool == NULL || pool->shared_mem == NULL || data == NULL)
    {
        return -1;
    }

    int size = pool->size;
    ptask = pool->shared_mem->task_pro;

    //for(i = 0; i < size; i++)
    while (1)
    {
        i = index % size; //使用静态变量，循环到下一个进程，每个进程均摊任务
        ++index;
        if (index >= size) //代码优化
        {
            index = 0;
        }
        //index = index >= size ?  0 : index;

        if (pool->id[i].pid == -1)
        {
            continue;
        }

        if (ptask[i].cur_task_status == IDLE_STATUS)
        {
            ptask[i].cur_task_status = BUSY_STATUS;
            memcpy(ptask[i].data, data, sizeof(st_data));
            ret = set_sem_value(pool->semid, ptask[i].index_pro, 1);
            if (-1 == ret)
            {
                ptask[i].cur_task_status = IDLE_STATUS;
                int t = get_sem_value(pool->semid, 1);
                printf("pid[%d] current semid index[%d] value is %d\n", getpid(), ptask[i].index_pro, t);
                return -1;
            }
            nret = ptask[i].index_pro;
            break;
        }
    }
    return nret;
}

int main(int argc, char *argv[])
{
    int i = 0, ret;
    process_pool *pool = NULL;
    pool = create_process_tpool(30);

    int m = 0;
    while (1)
    {
        if (m == 1000)
            break;
        ++m;
        st_data *array[1000];
        for (i = 0; i < 1000; i++)
        {
            array[i] = (st_data *)malloc(sizeof(st_data));
            array[i]->slen = sprintf(array[i]->data, "task %d", i);
            ret = distribute_task_tpool(pool, array[i]);
            if (ret == -1)
            {
                free(array[i]);
                i--;
            }
            //printf("deal index = %d\n",ret);
        }
        for (i = 0; i < 1000; i++)
        {
            free(array[i]);
        }
    }
    /*
	int n=0;
	for(n=0;n<5;n++)
	{
		sleep(1);
		task_st *ptask = NULL;
		ptask = pool->shared_mem->task_pro;
		for(i = 0; i < pool->size; i++)
		{
			printf("test4 pid:%d index=%d %p %d %d\n",pool->id[i].pid,pool->id[i].index, ptask,ptask->index_pro,ptask->cur_task_status);
			ptask = ptask + 1;
		}	
	}
	*/
    //sleep(12);
    stop_process_tpool(pool);
    destroy_process_tpool(pool);
    return 0;
}

//gcc -g -o main shmtpool.c base-sem.c -I./ -lpthread

/*
valgrind  --leak-check=full ./main 


查看所有的进程
 top -bn1
 
 居然有僵尸的 T_T ~_~ .........................
 
9426 root      20   0     0    0    0 Z  0.0  0.0   0:00.00 main <defunct>                                                      
 9427 root      20   0     0    0    0 Z  0.0  0.0   0:00.02 main <defunct>                                                      
 9428 root      20   0     0    0    0 Z  0.0  0.0   0:00.00 main <defunct>                                                      
 9429 root      20   0     0    0    0 Z  0.0  0.0   0:00.01 main <defunct>                                                      
 9430 root      20   0     0    0    0 Z  0.0  0.0   0:00.00 main <defunct>                                                      
 9431 root      20   0     0    0    0 Z  0.0  0.0   0:00.00 main <defunct>                                                      
 9432 root      20   0     0    0    0 Z  0.0  0.0   0:00.00 main <defunct>                                                      
 9433 root      20   0     0    0    0 Z  0.0  0.0   0:00.00 main <defunct>     

*/
